package com.example.socket.filter.session;

import com.example.socket.core.Message;
import com.example.socket.core.Request;
import com.example.socket.core.Session;
import com.example.socket.core.SessionContext;
import com.example.socket.handler.Dispatcher;
import com.example.socket.handler.SnGenerator;
import com.example.socket.thread.DelayedElement;
import io.netty.channel.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.PostConstruct;
import javax.management.MBeanServer;
import javax.management.ObjectName;
import java.lang.management.ManagementFactory;
import java.util.*;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.DelayQueue;

import static com.example.socket.core.SessionKeys.*;
import static com.example.socket.filter.session.SessionEvent.Type;

/**
 * @author frank
 * 会话管理实现，作为{@link SessionManager}的实现方式，用于管理的会话创建与关闭
 */
public class NettySessionManager extends SessionEventDispacher implements SessionManager, NettySessionManagerMBean {

    private static final Logger logger = LoggerFactory.getLogger(NettySessionManager.class);

    /** 已经鉴别用户身份的会话，Key:用户身份标识，Value:{@link Session} */
    private ConcurrentHashMap<Object, Session> identities = new ConcurrentHashMap<>();
    /** 匿名会话，Key:{@link Session#getId()}，Value:{@link Session} */
    private ConcurrentHashMap<Long, Session> anonymous = new ConcurrentHashMap<>();
    /** 已经关闭的已鉴权会话，Key:用户身份标识，Value:{@link Session} */
    private ConcurrentHashMap<Object, Session> passivates = new ConcurrentHashMap<>();

    /** 延迟删除队列 */
    private DelayQueue<DelayedElement<SessionElement>> removeQueue;

    // ----

    /** 推送消息序列号生成器(全部连接共用) */
    private final SnGenerator generator = new SnGenerator();
    /** 推送代理工厂 */
    private PushProxyFactory proxyFactory;
    /** SESSION工厂 */
    private SessionFactory<Channel> sessionFactory;

    @PostConstruct
    public void initialize() throws Exception {
        super.initializer();
        // 推送代理
        proxyFactory = new PushProxyFactory(this, dispatcher);
        // 延迟会话守护线程
        removeQueue = new DelayQueue<>();
        Thread thread = new Thread("会话延迟移除处理") {
            @Override
            public void run() {
                while (true) {
                    try {
                        DelayedElement<SessionElement> e = removeQueue.take();
                        SessionElement element = e.getContent();
                        long sessionId = element.getId();
                        Session session = sessionFactory.getSession(sessionId);
                        Object identity = element.getIdentity();
                        if (session == null && identity != null) {
                            session = passivates.get(identity);
                        }
                        if (session == null) {
                            continue;
                        }
                        long now = e.getEnd().getTime();
                        //超时
                        if (now - delayTime >= session.getLastTime()) {
                            destorySession(session);
                        }
                    } catch (Exception e) {
                        logger.error("会话延迟移除处理线程被非法打断", e);
                    }
                }
            }
        };
        thread.setDaemon(true);
        thread.start();

        // 注册MBean
        try {
            MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
            ObjectName name = new ObjectName("com.example.socket:type=SessionFactoryMBean");
            mbs.registerMBean(this, name);
        } catch (Exception e) {
            logger.error("JMX", e);
        }
    }

    // ----

    @Override
    public <T> T getPushProxy(Class<T> clz, Session[] sessions) {
        T proxy = proxyFactory.getProxy(clz, sessions);
        return proxy;
    }

    @Override
    public <T> T getPushProxy(Class<T> clz, Session session) {
        T proxy = proxyFactory.getProxy(clz, session);
        return proxy;
    }


    @Override
    public <T> T getPushProxy(Class<T> clz, Object ids) {
        Session[] sessions = getIdSessions(ids);
        T proxy = proxyFactory.getProxy(clz, sessions);
        return proxy;
    }

    @Override
    public <T> T getPushProxy(Class<T> clz, Object[] ids) {
        Session[] sessions = getIdSessions(ids);
        T proxy = proxyFactory.getProxy(clz, sessions);
        return proxy;
    }

    @Override
    public <T> T getPushAllProxy(Class<T> clz) {
        Collection<Session> list = new HashSet<>(identities.values());
        list.addAll(passivates.values());
        Session[] sessions = list.toArray(new Session[list.size()]);
        T proxy = getPushProxy(clz, sessions);
        return proxy;
    }

    // ----

    /**
     * 获取指定id的session对象 没找到会创建一个FakeSession
     * @param ids
     * @return
     */
    private Session[] getIdSessions(Object... ids) {
        List<Session> sessions = new ArrayList<>(ids.length);
        for (Object id : ids) {
            if (id == null) {
                continue;
            }
            if (id instanceof Session) {
                sessions.add((Session) id);
                continue;
            }
            Session session = getSession(id);
            if (session == null) {
                session = FakeSession.valueOf(id);
            }
            sessions.add(session);
        }

        // 发送信息
        if (sessions.isEmpty()) {
            return new Session[0];
        }
        return sessions.toArray(new Session[sessions.size()]);
    }

    @Override
    public void send(Request<?> request, Object... ids) {
        if (ids == null || ids.length == 0) {
            return;
        }
        // 获取对应的会话
        Session[] sends = getIdSessions(ids);
        if (sends.length == 0) {
            return;
        }
        this.send(request, sends);
    }

    @Override
    public void send(Request<?> request, Session[] sessions) {
        if (sessions == null) {
            return;
        }
        // 未设置SN时默认SN
        if (request.getSn() == 0) {
            int sn = generator.next();
            request.setSn(sn);
        }
        // 消息编码
        Message message = dispatcher.encodeRequest(request);
        for (Session session : sessions) {
            dispatcher.send(message.copy(), session);
        }
    }

    @Override
    public void sendAllIdentified(Request<?> request) {
        if (identities.isEmpty() && passivates.isEmpty()) {
            return;
        }

        Collection<Session> onlineSessions = getOnlineSessions();
        Session[] sends = onlineSessions.toArray(new Session[onlineSessions.size()]);
        this.send(request, sends);
    }

    @Override
    public boolean isOnline(Object... ids) {
        for (Object id : ids) {
            if (!identities.containsKey(id) && !passivates.containsKey(id)) {
                return false;
            }
        }
        return true;
    }

    @Override
    public Collection<Object> getOnlineIdentities() {
        HashSet<Object> result = new HashSet<>();
        result.addAll(identities.keySet());
        result.addAll(passivates.keySet());
        return result;
    }

    @Override
    public Collection<Session> getOnlineSessions() {
        HashSet<Session> result = new HashSet<>();
        result.addAll(identities.values());
        result.addAll(passivates.values());
        return result;
    }

    @Override
    public Collection<Object> kick(int cause, Object... ids) {
        HashSet<Object> result = new HashSet<>();
        for (Object id : ids) {
            Session session = getSession(id);
            if (session == null) {
                continue;
            }
            kick(cause, session);
            result.add(id);
        }
        return result;
    }

    @Override
    public void kick(int cause, Session session) {
        // 设置会话状态
        ATT_KICKED.setValue(session, true);
        if (cause < 0) {
            ATT_IGNORE_EVENT.setValue(session, true);
        }
        // 先移除被踢的会话再关闭会话，这是为了避免sessionClosed被异步触发产生的未知性
        destorySession(session);
        // 关闭链路
        session.close();
    }

    @Override
    public Collection<Long> kickAll(int cause) {
        HashSet<Long> result = new HashSet<>();
        for (Entry<Object, Session> entry : identities.entrySet()) {
            Object id = entry.getKey();
            kick(cause, id);
        }
        for (Entry<Object, Session> entry : passivates.entrySet()) {
            Object id = entry.getKey();
            kick(cause, id);
        }
        for (Session session : anonymous.values()) {
            kick(cause, session);
        }
        identities.clear();
        passivates.clear();
        anonymous.clear();
        return result;
    }

    @Override
    public Session getSession(Object id) {
        Session result = identities.get(id);

        if (result != null) {
            return result;
        }
        result = passivates.get(id);
        return result;
    }

    @Override
    public void replace(Session src, Session dest) {
        if (src == null || dest == null) {
            throw new IllegalArgumentException("复制源或目标对象不能为空");
        }
        SessionContext content = src.getContext();
        if (content == null) {
            return;
        }
        Object id = content.getIdentity();
        kick(SessionEventCause.REPLACE, id);
        if (logger.isInfoEnabled()) {
            logger.info("*** SESSION[{}]替代SESSION[{}], 绑定用户身份[{}] ***", new Object[]{dest.getId(), src.getId(), id});
        }
        // 进行会话内容复制
        ATT_KICKED.remove(src);
        dest.bindContext(content);

        // 设置会话身份
        onIdentified(dest);
        // 设置已处理标记
        ATT_PROCEED.setValue(dest, true);
        // 清空被T的源属性
        // src.unbindContext()
        if (logger.isDebugEnabled()) {
            logger.debug("清空被复制的源SESSION属性...");
        }

    }

    @Override
    public void bind(Session session, Object id) {
        if (session == null || id == null) {
            return;
        }

        logger.info("*** SESSION[{}]绑定用户身份[{}] ***", session.getId(), id);

        Session prev = getSession(id);
        if (prev != null) {
            if (prev != session) {
                // 强制登出现有SESSION
                kick(SessionEventCause.ENFORCE_LOGOUT, id);
            } else {
                // 相同SESSION
                int cause = SessionEventCause.NORMAL;
                fireActivedEvent(id, cause, session);
                return;
            }
        }

        // 设置会话身份
        SessionContext context = session.getContext();
        context.setIdentity(id);
        onIdentified(session);
        // 设置已处理标记
        ATT_PROCEED.setValue(session, true);
    }

    @Override
    public int count(boolean includeAnonymous) {
        int result = 0;
        result += identities.size();
        result += passivates.size();
        if (includeAnonymous) {
            result += anonymous.size();
        }
        return result;
    }

    // 监听的方法

    /** 连接创建新SESSION */
    public Session createSession(Channel channel, SessionUtil creator) {
        Session session = sessionFactory.createSession(channel, creator);
        session.setTransitory(transitory);
        addSession(session);
        return session;
    }

    /** 连接附加到SESSION */
    public Session attach(Channel channel, long sid, boolean first) {
        Session lookup = lookup(channel);
        if (!transitory) {
            // 长链接直接返回
            return lookup;
        }
        if (lookup != null && lookup.getId() == sid) {
            return lookup;
        }
        // 找回并附加 SESSION
        Session session = sessionFactory.getSession(sid);
        if (session != null) {
            if (!(session instanceof TachSession)) {
                return lookup;
            }
            if (lookup != null && session != lookup) {
                // 销毁当前SESSION
                destorySession(lookup);
            }
            // 附加到指定SESSION
            ((TachSession) session).attach(channel, first);
            activeSession(session);
        } else {
            logger.debug("请求附加的SESSION[{}]不存在, 使用当前SESSION[{}]", sid, lookup);
            session = lookup;
        }
        return session;
    }

    private void activeSession(Session session) {
        Object identity = getIdentity(session);
        if (identity != null) {
            Session s = passivates.remove(identity);
            identities.put(identity, session);
            if (s != null) {
                DelayedElement<SessionElement> e = new DelayedElement<>(
                        SessionElement.valueOf(session.getId(), identity), new Date());
                removeQueue.remove(e);
                int cause = SessionEventCause.NORMAL;
                fireActivedEvent(identity, cause, session);
            }
        }
    }

    public static Session lookup(Channel channel) {
        io.netty.util.Attribute<Session> attr = channel.attr(Session.SESSION_KEY);
        return attr.get();
    }

    /** 连接分离SESSION */
    void detach(TachSession session, Channel channel) {
        if (!transitory) {
            return;
        }
        session.detach(channel);
    }

    /**
     * 响应会话创建
     * @param session 新创建的会话实例
     */
    void onSessionOpened(Session session, Channel channel) {
        if (!transitory && session instanceof TachSession) {
            //长连接模式,直接绑定会话身份
            ((TachSession) session).attach(channel, true);
        }
    }

    /**
     * 响应会话关闭
     * @param session 被关闭的会话实例
     */
    public void onSessionClosed(Session session) {
        Object identity = getIdentity(session);
        if (identity == null) {
            // 销毁连接
            destorySession(session);
            return;
        }
        //如果是短连接模式
        if (transitory) {
            // 钝化SESSION
            passivateSession(session);
        } else {
            // 销毁SESSION
            destorySession(session);
        }
    }

    /**
     * 响应用户验证
     * @param session
     */
    void onIdentified(Session session) {
        Object identity = getIdentity(session);
        if (identity == null) {
            return;
        }
        anonymous.remove(session.getId());
        identities.put(identity, session);

        // 处理的会话替换
        Session prev = passivates.remove(identity);
        if (prev != null && prev != session) {
            // 复制会话属性
            fireReplacedEvent(identity, session, prev);
            return;
        }
        fireIdentifiedEvent(identity, session);
    }

    // 内部方法

    /** 添加匿名SESSION */
    public void addSession(Session session) {
        anonymous.put(session.getId(), session);
        //addToQueue(session, Calendar.MILLISECOND, validity);
    }

    private void addToQueue(Session session, int timeType, int delay) {
        Calendar calendar = Calendar.getInstance();
        calendar.add(timeType, delay);
        Date destoryTime = calendar.getTime();
        SessionContext context = session.getContext();
        Object identity = null;
        if (context != null) {
            identity = context.getIdentity();
        }
        DelayedElement<SessionElement> e = new DelayedElement<>(SessionElement.valueOf(session.getId(), identity), destoryTime);
        while (removeQueue.contains(e)) {
            removeQueue.remove(e);
        }
        removeQueue.put(e);
    }

    /** 钝化SESSION */
    private void passivateSession(Session session) {
        // 移除匿名连接
        long sessionId = session.getId();
        anonymous.remove(sessionId);
        // 移除验证连接
        Object identity = getIdentity(session);
        identities.remove(identity);
        // SESSION 钝化
        passivates.put(identity, session);
        int cause = SessionEventCause.NORMAL;
        firePassivatedEvent(identity, cause, session);

        // 将已经鉴权的会话放入延迟队列中
        addToQueue(session, Calendar.SECOND, delayTime);
    }

    /** 销毁SESSION */
    private void destorySession(Session session) {
        // 移除匿名连接
        anonymous.remove(session.getId());
        // 移除验证连接
        Object identity = getIdentity(session);
        if (identity != null) {
            identities.remove(identity, session);
            passivates.remove(identity, session);
        }
        // 销毁SESSION
        sessionFactory.destorySession(session);

        // 触发CLOSE事件
        int cause = SessionEventCause.NORMAL;
        fireClosedEvent(identity, cause, session);
    }

    private Object getIdentity(Session session) {
        SessionContext context = session.getContext();
        if (context == null) {
            return null;
        }
        return context.getIdentity();
    }

    // ---- SPRING BEAN -----

    /** SESSION销毁延迟时间(单位:秒) */
    private int delayTime = 0;
    /** 是否短连接模式 */
    private boolean transitory = false;
    /** 通信分发器 */
    private Dispatcher dispatcher;

    /**
     * 消息分发器
     * @param dispatcher
     */
    public void setDispatcher(Dispatcher dispatcher) {
        this.dispatcher = dispatcher;
    }

    /**
     * 设置会话真实移除的延迟时间
     * @param delayTime 延迟时间(单位:秒)
     */
    public void setDelayTime(int delayTime) {
        if (delayTime < 0) {
            throw new IllegalArgumentException("延迟时间[" + delayTime + "]不能小于等于0");
        }
        this.delayTime = delayTime;
        logger.debug("*** SESSION 有效时间 [{}] 秒", delayTime);
    }

    /**
     * 检查是否有延迟时间设置
     * @return
     */
    private boolean hasDelayTimes() {
        if (delayTime > 0) {
            return true;
        }
        return false;
    }

    /**
     * 是否短连接服务器
     */
    public void setTransitory(boolean transitory) {
        this.transitory = transitory;
    }

    /**
     * 是否短连接服务器
     */
    public boolean isTransitory() {
        return transitory;
    }

    public void setListeners(Map<Type, List<SessionListener>> listeners) {
        this.listeners = listeners;
    }

    public void setSessionFactory(SessionFactory<Channel> sessionFactory) {
        this.sessionFactory = sessionFactory;
    }

    // ---- MBEAN ----

    @Override
    public int getTotalCounts() {
        return sessionFactory.getAll().size();
    }

    @Override
    public int getIdentitieCounts() {
        return identities.size();
    }

    @Override
    public int getAnonymouCounts() {
        return anonymous.size();
    }

    @Override
    public int getPassivateCounts() {
        return passivates.size();
    }

    public SessionFactory<Channel> getSessionFactory() {
        return sessionFactory;
    }

}